Wrangling over-sized data
A gentle intro

with Amazon S3, Apache Arrow and dplyr


Cesaire Tobias, Data Nerd

2023-02-23


  https://github.com/ces0491

Overview


  • Intro
  • Meaningful analytics
  • Working with over-sized data
  • What is S3, Arrow and dplyr?
  • Example workflow
  • Questions

Intro


Intro



  • R enthusiast

  • Learned to code as a quant working in asset management

  • Used data analytics as a path to working in management consulting, tech and e-commerce

  • Did most of my early analysis in Excel and transitioned to R

  • Added SQL, Python and Cloud technologies to my toolkit

  • Passionate about making data analytics meaningful

  • Timely, Useful and Correct (TUC)

Meaningful analytics


Big healthcare data is incredibly powerful, but its Achilles heel is time. Its value is in the moment and its importance decreases exponentially with time, which makes critically important the rapid response and concerted effort to process collected clinical information.

- Volume and Value of Big Healthcare Data, Journal of Medical Statistics and Informatics, 2016

The value decay of data


Parallels between the growth in size and decay in value of large heterogeneous datasets. The horizontal axis represents time, whereas the vertical axis shows the value of data. As we acquire more data at an ever faster rate, its size and value exponentially increase (black curve). The color curves indicate the exponential decay of the value of data from the point of its fixation (becoming static).

Working with over-sized data

  • Relational databases are still around and hugely popular but
  • Data and specifically local files are getting bigger
  • Additionally, many Data Warehouses/Data Lakes use flat-file storage (.csv, .parquet, .json etc) - there are query engines in many environments, but you can often end up with large extracts.

So, how do you work with data extracts that aren’t already in a database, and are bigger than your memory?

How can we perform EDA on these over-sized datasets in a familiar environment that allows us to quickly realise value from our data?

What is Arrow, S3 and dplyr?

Apache Arrow is a language-agnostic framework that combines the benefits of columnar data structures with in-memory computing. It is a highly standardised and performant framework.

Amazon Simple Storage Service is an object storage service offering industry-leading scalability, data availability, security, and performance.

One of the core packages of the tidyverse in the R programming language, dplyr is primarily a set of functions designed to enable dataframe manipulation in an intuitive, user-friendly way.

Over-sized data wrangling example

There are great examples of data analysis on big data (2 billion rows) in the arrow docs.

For today, I’m going to focus on biggish (but manageable) subset of the NYC taxi data


My sample is 20GB on disk and is 36 separate parquet files partitioned by year and month

./data/nyc-taxi
├── year=2009
│   ├── month=1
│   │   └── part-0.parquet
│   ├── month=10
│   │   └── part-0.parquet
│   ├── month=11
│   │   └── part-0.parquet
│   ├── month=12
│   │   └── part-0.parquet
│   ├── month=2
│   │   └── part-0.parquet
│   ├── month=3
│   │   └── part-0.parquet
│   ├── month=4
│   │   └── part-0.parquet
│   ├── month=5
│   │   └── part-0.parquet
│   ├── month=6
│   │   └── part-0.parquet
│   ├── month=7
│   │   └── part-0.parquet
│   ├── month=8
│   │   └── part-0.parquet
│   └── month=9
│       └── part-0.parquet
├── year=2010
│   ├── month=1
│   │   └── part-0.parquet
│   ├── month=10
│   │   └── part-0.parquet
│   ├── month=11
│   │   └── part-0.parquet
│   ├── month=12
│   │   └── part-0.parquet
│   ├── month=2
│   │   └── part-0.parquet
│   ├── month=3
│   │   └── part-0.parquet
│   ├── month=4
│   │   └── part-0.parquet
│   ├── month=5
│   │   └── part-0.parquet
│   ├── month=6
│   │   └── part-0.parquet
│   ├── month=7
│   │   └── part-0.parquet
│   ├── month=8
│   │   └── part-0.parquet
│   └── month=9
│       └── part-0.parquet
└── year=2011
    ├── month=1
    │   └── part-0.parquet
    ├── month=10
    │   └── part-0.parquet
    ├── month=11
    │   └── part-0.parquet
    ├── month=12
    │   └── part-0.parquet
    ├── month=2
    │   └── part-0.parquet
    ├── month=3
    │   └── part-0.parquet
    ├── month=5
    ├── month=6
    ├── month=7
    ├── month=8
    └── month=9


So not THAT big but big enough to make this a hassle to analyse with your standard desktop toolkit

Workflow - Reading the data


First things first, we need to read in our data

The data lives in an S3 bucket created by Voltron Data and is freely accessible

bucket_name <- "voltrondata-labs-datasets"
dataset_name <- "nyc-taxi"
fs_path <- paste(bucket_name, dataset_name, sep = "/")

bucket <- arrow::s3_bucket(fs_path)
datatables <- bucket$ls(recursive = TRUE)


  • We can connect to large datasets in the cloud but don’t need to be familiar with cloud querying tools.
  • Data becomes increasingly democratised as more analysts can work with the data without leaving their familiar tools and without needing any additional setup.

Workflow - Opening the data


Next we open the dataset - not quite the same as loading the data.

nyc_taxi <- arrow::open_dataset('./data/nyc-taxi')
[1] "428,790,955"

Our data is 428 million rows

lobstr::obj_size(nyc_taxi)
261.59 kB

But occupies less than 0.5Mb in our local environment!

Workflow - Wrangling the data


As we saw previously, our data isn’t loaded in to R (as for e.g. a data.frame) but it is a Filesytem object that reads files as needed. These files can be wrangled with dplyr for efficient EDA.

trip_data <- nyc_taxi |> 
  dplyr::filter(year == 2010) |>
  dplyr::select(matches("pickup"), matches("dropoff"))

trip_data
FileSystemDataset (query)
pickup_datetime: timestamp[ms]
pickup_longitude: double
pickup_latitude: double
pickup_location_id: int64
dropoff_datetime: timestamp[ms]
dropoff_longitude: double
dropoff_latitude: double
dropoff_location_id: int64

* Filter: (year == 2010)
See $.data for the source Arrow object

The output is an unevaluated query so we need to use collect() to evaluate and return to R

# this collect is timing out for some reason
trip_data |>
  dplyr::collect()
# A tibble: 6 × 8
  pickup_datetime     pickup_longi…¹ picku…² picku…³ dropoff_datetime    dropo…⁴
  <dttm>                       <dbl>   <dbl>   <int> <dttm>                <dbl>
1 2010-11-01 07:13:00          -74.0    40.7      NA 2010-11-01 07:13:00   -74.0
2 2010-11-05 02:44:00          -74.0    40.7      NA 2010-11-05 02:59:00   -74.0
3 2010-11-05 21:01:00          -74.0    40.8      NA 2010-11-05 21:28:00   -74.0
4 2010-11-05 21:13:00          -74.0    40.8      NA 2010-11-05 21:17:00   -74.0
5 2010-11-05 21:09:00          -73.9    40.8      NA 2010-11-05 21:22:00   -74.0
6 2010-11-05 21:06:00          -74.0    40.8      NA 2010-11-05 21:19:00   -74.0
# … with 2 more variables: dropoff_latitude <dbl>, dropoff_location_id <int>,
#   and abbreviated variable names ¹​pickup_longitude, ²​pickup_latitude,
#   ³​pickup_location_id, ⁴​dropoff_longitude

Workflow - Presenting the data

from a FileSystemDataset object

distance_fs <- nyc_taxi |>
  dplyr::select(vendor_name, trip_distance, month) |>
  dplyr::group_by(month, vendor_name) |>
  dplyr::summarize(distance = sum(trip_distance, na.rm = TRUE))


To a tibble

# A tibble: 6 × 6
# Groups:   month [2]
  month vendor_name  distance date       month_name distance_str
  <int> <chr>           <dbl> <date>     <chr>      <chr>       
1     1 VTS         55479602. 2010-01-01 January    "55,479,602"
2     1 DDS          4775573. 2010-01-01 January    " 4,775,573"
3     1 CMT         50651737. 2010-01-01 January    "50,651,737"
4    11 VTS         61108477. 2010-11-01 November   "61,108,477"
5    11 CMT         56155304. 2010-11-01 November   "56,155,304"
6    11 DDS          2507956. 2010-11-01 November   " 2,507,956"

Workflow - Presenting the data

To charts

p <- ggplot2::ggplot(data = distance_df_fmt, ggplot2::aes(x = date, y = distance, color = vendor_name)) +
  ggplot2::geom_line() +
  ggplot2::scale_y_continuous(labels  = 
                       scales::label_number(scale = 1e-6, prefix = "", suffix = "", accuracy = 1)) +
  ggplot2::scale_x_date(date_labels = "%b") +
  ggplot2::ylab("distance (millions of miles)") +
  ggplot2::ggtitle("Total Distance Travelled per Vendor, per Month")+
  ggplot2::theme_minimal()

plotly::ggplotly(p, width = 1600, height = 400)

And beyond We can then use any function in R to further explore, analyse or model our data

Bonus - Modelling


Randomly sample our data, use map_batches to sample a percentage of rows from each batch:

sampled_data <- nyc_taxi |>
  dplyr::filter(year == 2010) |>
  dplyr::mutate(tip_pct = tip_amount / total_amount)|>
  dplyr::select(year, tip_amount, total_amount, tip_pct, passenger_count) |>
  arrow::map_batches(~ arrow::as_record_batch(dplyr::sample_frac(as.data.frame(.), 1e-4))) |>
  dplyr::collect() # collect again because record_batch returns an arrow batch object

str(sampled_data)
tibble [14,794 × 5] (S3: tbl_df/tbl/data.frame)
 $ year           : int [1:14794] 2010 2010 2010 2010 2010 2010 2010 2010 2010 2010 ...
 $ tip_amount     : num [1:14794] 2.04 4 0 0 2 0 0 2 1 0 ...
 $ total_amount   : num [1:14794] 15.6 15.2 16.6 5.8 12.6 ...
 $ tip_pct        : num [1:14794] 0.13 0.263 0 0 0.159 ...
 $ passenger_count: int [1:14794] 1 1 1 1 1 2 1 1 1 3 ...

Fit a linear model to the sample data

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)
# A tibble: 1 × 12
  r.squa…¹ adj.r…²  sigma stati…³   p.value    df logLik     AIC     BIC devia…⁴
     <dbl>   <dbl>  <dbl>   <dbl>     <dbl> <dbl>  <dbl>   <dbl>   <dbl>   <dbl>
1   0.0306  0.0305 0.0811    234. 1.06e-100     2 16170. -32332. -32301.    97.3
# … with 2 more variables: df.residual <int>, nobs <int>, and abbreviated
#   variable names ¹​r.squared, ²​adj.r.squared, ³​statistic, ⁴​deviance

Bonus - Modelling (cont)


Then use map_batches to compute summary stats on the full 2010 dataset

mse <- nyc_taxi %>%
  dplyr::filter(year == 2010) %>%
  dplyr::select(tip_amount, total_amount, passenger_count) %>%
  dplyr::mutate(tip_pct = tip_amount / total_amount) %>%
  arrow::map_batches(function(batch) {
    batch %>%
      as.data.frame() %>%
      dplyr::mutate(pred_tip_pct = stats::predict(model, newdata = .)) %>%
      dplyr::filter(!is.nan(tip_pct)) %>%
      dplyr::summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = dplyr::n()) %>%
      arrow::as_record_batch()
  }) %>%
  dplyr::summarize(mse = sum(sse_partial) / sum(n_partial)) %>%
  dplyr::pull(mse, as_vector = TRUE)
[1] 19.69328

Questions